jsoup springboot爬虫demo
package com.zhy.springboot.dataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import com.alibaba.druid.pool.DruidDataSource; @Configuration @MapperScan(basePackages = "com.zhy.springboot.mapper.base", sqlSessionTemplateRef = "baseSqlSessionTemplate") public class BaseDataSourceConfig { @Bean(name = "baseDataSource") @ConfigurationProperties(prefix="spring.datasource.base") @Primary//标记基本库,为了有多个实例一样的时候以这个为主 public DruidDataSource setDataSource() { return new DruidDataSource(); } @Bean(name="baseTransactionManager") @Primary public DataSourceTransactionManager setTransactionManager(@Qualifier("baseDataSource") DruidDataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean(name = "baseSqlSessionFactory") @Primary public SqlSessionFactory setSqlSessionFactory(@Qualifier("baseDataSource") DruidDataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:sqlmaps/base/*.xml")); bean.setConfigLocation(new DefaultResourceLoader().getResource("classpath:mybatis-config.xml")); bean.setTypeAliasesPackage("com.zhy.springboot.model"); /* org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session .Configuration(); configuration.setMapUnderscoreToCamelCase(true); configuration.setUseGeneratedKeys(true); configuration.setCacheEnabled(true); bean.setConfiguration(configuration);*/ return bean.getObject(); } @Bean(name = "baseSqlSessionTemplate") @Primary public SqlSessionTemplate setSqlSessionTemplate(@Qualifier("baseSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); }}
package com.zhy.springboot.utils; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.MatchPhraseQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhy.springboot.model.EsSearchVo; @SuppressWarnings("resource") public class ElasticSearchUtils { private static Logger log = LoggerFactory.getLogger(ElasticSearchUtils.class); public static final String CLUSTER_NAME = "es-zhy"; //实例名称 private static final String IP = "127.0.0.1"; private static final int PORT = 9300; //端口 private static TransportClient client; //取得实例 public static TransportClient getTransportClient(){ if(client!=null) { return client; }else { try { Settings settings = Settings.builder() .put("cluster.name",CLUSTER_NAME).put("client.transport.sniff", true) //启动集群嗅探 .build(); client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(IP), PORT)); } catch (UnknownHostException e) { log.error(">>>>>>>>>ES连接初始化失败<<<<<<<<<", e); client = null; } } return client; } /** * 创建索引 * @param indexName 索引名称,相当于数据库名称 * @param typeName 索引类型,相当于数据库中的表名 * @param id id名称,相当于每个表中某一行记录的标识 * @param jsonData json数据 */ @SuppressWarnings("deprecation") public static void createIndex(String indexName, String typeName, String id, String jsonData) { TransportClient transportClient = getTransportClient(); if(transportClient!=null) { IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(jsonData); requestBuilder.execute().actionGet(); }else { log.error(">>>>>>>>>ES连接初始化失败,创建索引失败<<<<<<<<<"); } } /** * 执行搜索 * @param indexname 索引名称 * @param type 索引类型 * @param queryBuilder 查询条件 * @return */ public static SearchResponse searcher(String indexName, String typeName, QueryBuilder queryBuilder) { TransportClient transportClient = getTransportClient(); SearchResponse searchResponse = transportClient.prepareSearch(indexName) .setTypes(typeName).setQuery(queryBuilder).execute() .actionGet();//执行查询 return searchResponse; } /** * 根据查询数据更新 * @param indexName 索引名称 * @param typeName 索引类型 * @param jsonData json数据 */ public static void updateByQuery(String indexName, String typeName, String jsonData) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = JSON.parseObject(jsonData); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<SearchRequestBuilder> dataLst = new ArrayList<>(); for (String key : keySet) { if(key.equals("itemId")||key.equals("itemKey")||key.equals("triggerId")||key.equals("hostName")||key.equals("hostIp")) { QueryBuilder builder = QueryBuilders.matchQuery(key, jsonObject.get(key)); SearchRequestBuilder search = transportClient.prepareSearch(indexName) .setTypes(typeName) .setQuery(builder); dataLst.add(search); } } if(dataLst.size()!=5) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { MultiSearchRequestBuilder mrb = transportClient.prepareMultiSearch() .add(dataLst.get(0)).add(dataLst.get(1)).add(dataLst.get(2)).add(dataLst.get(3)).add(dataLst.get(4)) ; MultiSearchResponse multiSearchResponse =mrb.execute().actionGet(); if(multiSearchResponse!=null) { for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {SearchResponse searchResponse = item.getResponse();SearchHits hits = searchResponse.getHits(); if (null == hits || hits.getTotalHits() == 0) { log.error("ES未查询到任何结果!!!");} else { for (SearchHit hit : hits) { //updateIndex(indexName,typeName,hit.getId(),string); System.out.println(hit.getSourceAsString()); } } } }else { log.error("ES未查询到任何结果!!!"); } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 根据查询数据更新 * @param indexName 索引名称 * @param typeName 索引类型 * @param flag * @param jsonData json数据 */ @SuppressWarnings({"rawtypes"}) public static void updateByQuery2(String indexName, String typeName, Map<String,Object> dataMap, boolean flag) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>(); for (String key : keySet) { if(key.equals("itemId")||key.equals("itemKey")||key.equals("triggerId")||key.equals("hostName")||key.equals("hostIp")) { dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key))); } } if(dataLst.size()!=5) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0)) .must(dataLst.get(1)) .must(dataLst.get(2)) .must(dataLst.get(3)) .must(dataLst.get(4)); SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet(); SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i ) { if (null == hits.getHits()[i]) { log.error("ES未查询到任何结果!!!");} else { if(flag) { log.info("***************ES Match Data Length Is:" hits.getTotalHits() "****************"); Map map = JSONObject.parseObject(hits.getHits()[i].getSourceAsString(),Map.class); if(map.get("eventId")!=null&&StringUtils.isNotEmpty(map.get("eventId").toString())) { dataMap.put("eventId",Integer.valueOf(map.get("eventId").toString()) 1); updateIndex(indexName,typeName,hits.getHits()[i].getId(),JSON.toJSONString(dataMap)); } }else { updateIndex(indexName, typeName, hits.getHits()[i].getId(), JSON.toJSONString(dataMap)); } } } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 更新索引 * @param indexName 索引名称 * @param typeName 索引类型 * @param id id名称 * @param jsonData json数据 */ @SuppressWarnings("deprecation") public static void updateIndex(String indexName, String typeName, String id, String jsonData) { TransportClient transportClient = getTransportClient(); UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(jsonData); updateRequest.execute().actionGet(); } @SuppressWarnings("deprecation") public static void updateIndexNew(String indexName, String typeName, String id, Map<String,Object> dataMap) { TransportClient transportClient = getTransportClient(); UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(dataMap); updateRequest.execute().actionGet(); } /** * 删除指定索引 * @param indexName * @param typeName * @param id */ public static void deleteIndex(String indexName, String typeName, String id) { TransportClient transportClient = getTransportClient(); transportClient.prepareDelete(indexName, typeName, id).get(); } /** * 判断一个index中的type是否有数据 * @param index * @param type * @return * @throws Exception */ public static Boolean existDocOfType(String index, String type) throws Exception { SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(1); SearchResponse response = builder.execute().actionGet(); long docNum = response.getHits().getTotalHits(); if (docNum == 0) { return false; } return true; } /** * 根据type来删除数据 * @param index * @param types * @return */ public static long deleteDocByType(String index, String[] types) { TransportClient transportClient = getTransportClient(); long oldTime = System.currentTimeMillis(); StringBuilder b = new StringBuilder(); b.append("{\"query\":{\"match_all\":{}}}"); DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(transportClient, DeleteByQueryAction.INSTANCE) .setIndices(index).setTypes(types) .setSource(b.toString()) .execute().actionGet(); Stack<String> allTypes = new Stack<String>(); for(String type : types){ allTypes.add(type); } while(!allTypes.isEmpty()){ String type = allTypes.pop(); while(true){ try { if (existDocOfType(index, type) == false) { break; } } catch (Exception e) { log.error("queryError: " e.getMessage()); } } } System.out.println(System.currentTimeMillis() - oldTime); return response.getTotalDeleted(); } /** * 分页查询 * @param index * @param types * @return */ public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,String sortField,QueryBuilder queryBuilder) { EsSearchVo vo = new EsSearchVo(); TransportClient transportClient = getTransportClient(); SearchResponse res = null; SearchResponse searchResponse = transportClient.prepareSearch(indexName) .setQuery(queryBuilder) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。 .setSize(size) .addSort(sortField, SortOrder.DESC) .setExplain(true)// 设置是否按查询匹配度排序 .setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。 .actionGet();//注意:首次搜索已经包含数据 //获取总数量 long totalCount = searchResponse.getHits().getTotalHits(); vo.setTotal(totalCount); int page = 0; int pageCount=0; if(totalCount%size==0) { pageCount = (int)totalCount/(size); }else { pageCount = (int)totalCount/(size) 1; } if(totalCount<size) { page = 1; }else { page=(int)totalCount/(size); } log.info("*************************ES Page Query Size Number is:" pageCount "************************"); log.info("*************************ES Page Query Match Data Number is:" totalCount "************************"); for (int i = 1; i <=page; i ) { if(pageNum-1==0) { res = searchResponse; break; }else if(pageNum-1==i){ //再次发送请求,并使用上次搜索结果的ScrollId res = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(new TimeValue(20000)).execute() .actionGet(); break; }else { searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(new TimeValue(20000)).execute() .actionGet(); } } vo.setSr(res); return vo; } /** * 分页查询 * @param index * @param types * @return */ public static void findAllBySize(String indexName, Integer size,String sortField) { TransportClient transportClient = getTransportClient(); SearchResponse searchResponse = transportClient.prepareSearch(indexName) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。 .setSize(size) .addSort(sortField, SortOrder.DESC) .setExplain(true)// 设置是否按查询匹配度排序 .setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。 .actionGet();//注意:首次搜索已经包含数据 //获取总数量 long totalCount = searchResponse.getHits().getTotalHits(); int page = 0; page=(int)totalCount/(size); System.out.println(page); System.out.println(totalCount); for (int i = 0; i <=page; i ) { if(i==0) { parseSearchResponse(searchResponse); }else { //再次发送请求,并使用上次搜索结果的ScrollId searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(new TimeValue(20000)).execute() .actionGet(); parseSearchResponse(searchResponse); } } } public static void parseSearchResponse(SearchResponse searchResponse) { SearchHits hits = searchResponse.getHits(); int i = 0; System.out.println("-----------begin------------"); for (SearchHit searchHit : hits.getHits()) { try { i ; //String id = searchHit.getId(); System.out.println("第" i "条数据:" searchHit.getSourceAsString() ); } catch (Exception e) { e.printStackTrace(); } } System.out.println("-----------end------------"); } public static void updateByQueryTemp(String indexName, String typeName, Map<String,Object> dataMap) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>(); for (String key : keySet) { if(key.equals("itemId")) { dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key))); } } if(dataLst.size()!=1) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0)); SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i ) { if (null == hits.getHits()[i]) { log.error("ES未查询到任何结果!!!"); } else { log.info("ES Match Data Length Is:" hits.getTotalHits()); // log.info("ES Temp Update Data Summary Is:" dataMap.get("summary").toString()); updateIndexNew(indexName,typeName,hits.getHits()[i].getId(),dataMap); } } }else { log.error("ES未查询到任何结果!!!"); } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 根据查询数据更新 * @param indexName 索引名称 * @param typeName 索引类型 * @param flag * @param jsonData json数据 */ public static void updateByQueryNotice(String indexName, String typeName, Map<String,Object> dataMap) { try { TransportClient transportClient = getTransportClient(); // String jsonString = JSONObject.toJSONString(dataMap); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<QueryBuilder > dataLst = new ArrayList<>(); for (String key : keySet) { dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key))); } QueryBuilder qb = null; if(dataLst!=null&&dataLst.size()>0) { for (QueryBuilder queryBuilder : dataLst) { } } //IndexResponse myresponse = transportClient.prepareIndex(indexName, typeName).setSource(jsonString, XContentType.JSON).get(); SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i ) { if (null == hits.getHits()[i]) { log.error("ES未查询到任何结果!!!"); } else { System.out.println(hits.getHits()[i].getSourceAsString()); } } }else { log.error("ES未查询到任何结果!!!"); } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } }}
评论